package cn.young.dev.util;

import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import kafka.serializer.StringEncoder;

import java.util.Properties;

/**
 * @Auther: Ryan
 * @Date: 2022/1/21 14:05
 * @Description: 发送Kafka消息
 */
public class KafkaProducer {

    private final Producer<Integer, String> producer;

    public KafkaProducer(String zookeeperConnect, String kafkaBrokerList){
        Properties properties = new Properties();
        properties.put("zookeeper.connect", zookeeperConnect);//声明zk
        properties.put("serializer.class", StringEncoder.class.getName());
        properties.put("metadata.broker.list", kafkaBrokerList);// 声明kafka broker
        producer = new Producer<>(new ProducerConfig(properties));
    }

    public void send(String topic, String message) {
        producer.send(new KeyedMessage<>(topic, message));
    }

}

